#ifndef ODYSSEY_RELAY_H
#define ODYSSEY_RELAY_H

/*
 * Odyssey.
 *
 * Scalable PostgreSQL connection pooler.
*/

typedef struct od_relay od_relay_t;

typedef od_status_t (*od_relay_on_packet_t)(od_relay_t*, char *data, int size);
typedef void        (*od_relay_on_read_t)(od_relay_t*, int size);

struct od_relay
{
	int                   packet;
	int                   packet_skip;
	machine_msg_t        *packet_full;
	int                   packet_full_pos;
	machine_iov_t        *iov;
	machine_cond_t       *base;
	od_io_t              *src;
	od_io_t              *dst;
	od_status_t           error_read;
	od_status_t           error_write;
	od_relay_on_packet_t  on_packet;
	void                 *on_packet_arg;
	od_relay_on_read_t    on_read;
	void                 *on_read_arg;
};

static inline void
od_relay_init(od_relay_t *relay, od_io_t *io)
{
	relay->packet          = 0;
	relay->packet_skip     = 0;
	relay->packet_full     = NULL;
	relay->packet_full_pos = 0;
	relay->iov             = NULL;
	relay->base            = NULL;
	relay->src             = io;
	relay->dst             = NULL;
	relay->error_read      = OD_UNDEF;
	relay->error_write     = OD_UNDEF;
	relay->on_packet       = NULL;
	relay->on_packet_arg   = NULL;
	relay->on_read         = NULL;
	relay->on_read_arg     = NULL;
}

static inline void
od_relay_free(od_relay_t *relay)
{
	if (relay->packet_full)
		machine_msg_free(relay->packet_full);
	if (relay->iov)
		machine_iov_free(relay->iov);
}

static inline od_status_t
od_relay_start(od_relay_t *relay,
               machine_cond_t       *base,
               od_status_t           error_read,
               od_status_t           error_write,
               od_relay_on_read_t    on_read,
               void                 *on_read_arg,
               od_relay_on_packet_t  on_packet,
               void                 *on_packet_arg)
{
	relay->error_read    = error_read;
	relay->error_write   = error_write;
	relay->on_packet     = on_packet;
	relay->on_packet_arg = on_packet_arg;
	relay->on_read       = on_read;
	relay->on_read_arg   = on_read_arg;
	relay->base          = base;

	relay->iov = machine_iov_create();
	if (relay->iov == NULL)
		return OD_EOOM;

	machine_cond_propagate(relay->src->on_read, base);
	machine_cond_propagate(relay->src->on_write, base);

	int rc;
	rc = machine_read_start(relay->src->io, relay->src->on_read);
	if (rc == -1)
		return relay->error_read;

	return OD_OK;
}

static inline void
od_relay_attach(od_relay_t *relay, od_io_t *dst)
{
	assert(relay->dst == NULL);
	relay->dst = dst;
}

static inline void
od_relay_detach(od_relay_t *relay)
{
	if (! relay->dst)
		return;
	machine_write_stop(relay->dst->io);
	relay->dst = NULL;
}

static inline int
od_relay_stop(od_relay_t *relay)
{
	od_relay_detach(relay);
	machine_read_stop(relay->src->io);
	return 0;
}

static inline int
od_relay_full_packet_required(char *data)
{
	kiwi_header_t *header;
	header = (kiwi_header_t*)data;
	if (header->type == KIWI_BE_PARAMETER_STATUS ||
	    header->type == KIWI_BE_READY_FOR_QUERY  ||
	    header->type == KIWI_BE_ERROR_RESPONSE)
		return 1;
	return 0;
}

static inline od_status_t
od_relay_on_packet_msg(od_relay_t *relay, machine_msg_t *msg)
{
	int rc;
	od_status_t status;
	status = relay->on_packet(relay, machine_msg_data(msg),
	                          machine_msg_size(msg));
	switch (status) {
	case OD_OK:
	case OD_DETACH:
		rc = machine_iov_add(relay->iov, msg);
		if (rc == -1)
			return OD_EOOM;
		break;
	default:
		machine_msg_free(msg);
		break;
	}
	return status;
}

static inline od_status_t
od_relay_on_packet(od_relay_t *relay, char *data, int size)
{
	int rc;
	od_status_t status;
	status = relay->on_packet(relay, data, size);
	switch (status) {
	case OD_OK:
	case OD_DETACH:
		rc = machine_iov_add_pointer(relay->iov, data, size);
		if (rc == -1)
			return OD_EOOM;
		break;
	case OD_SKIP:
		relay->packet_skip = 1;
		status = OD_OK;
		break;
	default:
		break;
	}
	return status;
}

__attribute__((hot)) static inline od_status_t
od_relay_process(od_relay_t *relay, int *progress, char *data, int size)
{
	*progress = 0;

	/* on packet start */
	int rc;
	if (relay->packet == 0)
	{
		if (size < (int)sizeof(kiwi_header_t))
			return OD_UNDEF;

		int body;
		body = kiwi_read_size(data, sizeof(kiwi_header_t));

		int total = sizeof(kiwi_header_t) + body;
		if (size >= total) {
			*progress = total;
			return od_relay_on_packet(relay, data, total);
		}

		*progress = size;

		relay->packet      = total - size;
		relay->packet_skip = 0;

		rc = od_relay_full_packet_required(data);
		if (! rc)
			return od_relay_on_packet(relay, data, size);

		relay->packet_full = machine_msg_create(total);
		if (relay->packet_full == NULL)
			return OD_EOOM;
		char *dest;
		dest = machine_msg_data(relay->packet_full);
		memcpy(dest, data, size);
		relay->packet_full_pos = size;
		return OD_OK;
	}

	/* chunk */
	int to_parse = relay->packet;
	if (to_parse > size)
		to_parse = size;
	*progress = to_parse;
	relay->packet -= to_parse;

	if (relay->packet_full)
	{
		char *dest;
		dest = machine_msg_data(relay->packet_full);
		memcpy(dest + relay->packet_full_pos, data, to_parse);
		relay->packet_full_pos += to_parse;
		if (relay->packet > 0)
			return OD_OK;
		machine_msg_t *msg = relay->packet_full;
		relay->packet_full = NULL;
		relay->packet_full_pos = 0;
		return od_relay_on_packet_msg(relay, msg);
	} else {
		if (relay->packet_skip)
			return OD_OK;
		rc = machine_iov_add_pointer(relay->iov, data, to_parse);
		if (rc == -1)
			return OD_EOOM;
	}

	return OD_OK;
}

static inline od_status_t
od_relay_pipeline(od_relay_t *relay)
{
	char *current = od_readahead_pos_read(&relay->src->readahead);
	char *end     = od_readahead_pos(&relay->src->readahead);
	while (current < end)
	{
		int progress;
		int rc;
		rc = od_relay_process(relay, &progress, current, end - current);
		current += progress;
		od_readahead_pos_read_advance(&relay->src->readahead, progress);
		if (rc != OD_OK) {
			if (rc == OD_UNDEF)
				rc = OD_OK;
			return rc;
		}
	}
	return OD_OK;
}

static inline od_status_t
od_relay_read(od_relay_t *relay)
{
	int to_read;
	to_read = od_readahead_left(&relay->src->readahead);
	if (to_read == 0)
		return OD_OK;

	char *pos;
	pos = od_readahead_pos(&relay->src->readahead);

	int rc;
	rc = machine_read_raw(relay->src->io, pos, to_read);
	if (rc <= 0) {
		/* retry */
		int errno_ = machine_errno();
		if (errno_ == EAGAIN || errno_ == EWOULDBLOCK || errno_ == EINTR)
			return OD_OK;
		/* error or eof */
		return relay->error_read;
	}

	od_readahead_pos_advance(&relay->src->readahead, rc);

	/* update recv stats */
	relay->on_read(relay, rc);

	return OD_OK;
}

static inline od_status_t
od_relay_write(od_relay_t *relay)
{
	assert(relay->dst);

	if (! machine_iov_pending(relay->iov))
		return OD_OK;

	int rc;
	rc = machine_writev_raw(relay->dst->io, relay->iov);
	if (rc < 0) {
		/* retry or error */
		int errno_ = machine_errno();
		if (errno_ == EAGAIN || errno_ == EWOULDBLOCK || errno_ == EINTR)
			return OD_OK;
		return relay->error_write;
	}

	return OD_OK;
}

static inline od_status_t
od_relay_step(od_relay_t *relay)
{
	/* on read event */
	int rc;
	if (machine_cond_try(relay->src->on_read))
	{
		rc = od_relay_read(relay);
		if (rc != OD_OK)
			return rc;

		rc = od_relay_pipeline(relay);
		if (rc != OD_OK)
			return rc;

		if (machine_iov_pending(relay->iov)) {
			if (relay->dst)
				machine_cond_signal(relay->dst->on_write);
		} else {
			od_readahead_reuse(&relay->src->readahead);
		}
	}

	if (relay->dst == NULL)
		return OD_OK;

	/* on write event */
	if (machine_cond_try(relay->dst->on_write))
	{
		rc = od_relay_write(relay);
		if (rc != OD_OK)
			return rc;

		if (! machine_iov_pending(relay->iov))
		{
			rc = machine_write_stop(relay->dst->io);
			if (rc == -1)
				return relay->error_write;

			od_readahead_reuse(&relay->src->readahead);

			rc = machine_read_start(relay->src->io, relay->src->on_read);
			if (rc == -1)
				return relay->error_read;
		} else {
			rc = machine_write_start(relay->dst->io, relay->dst->on_write);
			if (rc == -1)
				return relay->error_write;
		}
	}

	return OD_OK;
}

static inline od_status_t
od_relay_flush(od_relay_t *relay)
{
	if (relay->dst == NULL)
		return OD_OK;

	if (! machine_iov_pending(relay->iov))
		return OD_OK;

	int rc;
	rc = od_relay_write(relay);
	if (rc != OD_OK)
		return rc;

	if (! machine_iov_pending(relay->iov))
		return OD_OK;

	rc = machine_write_start(relay->dst->io, relay->dst->on_write);
	if (rc == -1)
		return relay->error_write;

	for (;;)
	{
		if (! machine_iov_pending(relay->iov))
			break;

		machine_cond_wait(relay->dst->on_write, UINT32_MAX);

		rc = od_relay_write(relay);
		if (rc != OD_OK) {
			machine_write_stop(relay->dst->io);
			return rc;
		}
	}

	rc = machine_write_stop(relay->dst->io);
	if (rc == -1)
		return relay->error_write;

	return OD_OK;
}

#endif /* ODYSSEY_RELAY_H */
